package com.demo.sync;

import org.apache.rocketmq.client.consumer.*;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.message.MessageClientExt;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.common.message.MessageQueue;
import org.apache.rocketmq.common.protocol.heartbeat.MessageModel;

import java.nio.charset.Charset;
import java.util.List;

public class PullConsumerInBalance {
    public static void main(String[] args) throws MQClientException {
        //消费组
        final MQPullConsumerScheduleService scheduleService = new MQPullConsumerScheduleService("sync_producer_group_test1");
        //MQ NameService地址
        scheduleService.getDefaultMQPullConsumer().setNamesrvAddr("localhost:9876");
        //负载均衡模式
        scheduleService.setMessageModel(MessageModel.CLUSTERING);
        //执行回调线程池大小，默认20
        scheduleService.setPullThreadNums(5);
        //需要处理的消息topic
        scheduleService.registerPullTaskCallback("sync_TopicTest", new PullTaskCallback() {

            @Override
            public void doPullTask(MessageQueue mq, PullTaskContext context) {
                MQPullConsumer consumer = context.getPullConsumer();
                try {

                    long offset = consumer.fetchConsumeOffset(mq, false);
                    if (offset < 0)
                        offset = 0;
                    PullResult pullResult = consumer.pullBlockIfNotFound(mq, "*", offset, 32);
                    //System.out.printf("%s%n", offset + "\t" + mq.getBrokerName() + "\t" + pullResult.getMsgFoundList().size());
                    switch (pullResult.getPullStatus()) {
                        case FOUND:
                            List<MessageExt> messages = pullResult.getMsgFoundList();
                            for (int i = 0; i < messages.size(); i++) {
                                MessageClientExt messageClientExt = (MessageClientExt) messages.get(i);
                                System.out.printf(Thread.currentThread().getName() + " Receive New Messages: "
                                        + new String(messageClientExt.getBody(), Charset.forName("UTF-8")) + "%n");
                            }
                        case NO_MATCHED_MSG:
                            break;
                        case NO_NEW_MSG:
                        case OFFSET_ILLEGAL:
                            break;
                        default:
                            break;
                    }
                    consumer.updateConsumeOffset(mq, pullResult.getNextBeginOffset());
                    context.setPullNextDelayTimeMillis(100);
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }
        });

        scheduleService.start();
    }


}
/*
每次循环都从remote端返回负载均衡后分配的队列，并通过回调获取该queue，回调默认会放在线程池执行
应该根据实际的queue个数设置线程池大小
应使用pullBlockIfNotFound，防止线程空轮询,默认10s，为了一个空的queue被返回而阻塞该线程，导致其他队列没有线程执行，应该保证线程的个数大于等于队列个数
 */